home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- '''Functions to manage apport problem report files.
-
- Copyright (C) 2006 Canonical Ltd.
- Author: Martin Pitt <martin.pitt@ubuntu.com>
-
- This program is free software; you can redistribute it and/or modify it
- under the terms of the GNU General Public License as published by the
- Free Software Foundation; either version 2 of the License, or (at your
- option) any later version. See http://www.gnu.org/copyleft/gpl.html for
- the full text of the license.
- '''
- import os
- import glob
- import subprocess
- import os.path as os
- import ConfigParser
- from problem_report import ProblemReport
- from packaging_impl import impl as packaging
- report_dir = os.environ.get('APPORT_REPORT_DIR', '/var/crash')
- _config_file = '~/.config/apport/settings'
-
- def find_package_desktopfile(package):
- '''If given package is installed and has a single .desktop file, return the
- path to it, otherwise return None.'''
- if package is None:
- return None
- desktopfile = None
- for line in packaging.get_files(package):
- if line.endswith('.desktop'):
- if desktopfile:
- return None
- desktopfile = line
- continue
- desktopfile
-
- return desktopfile
-
-
- def likely_packaged(file):
- '''Check whether the given file is likely to belong to a package.
-
- This is semi-decidable: A return value of False is definitive, a True value
- is only a guess which needs to be checked with find_file_package().
- However, this function is very fast and does not access the package
- database.'''
- pkg_whitelist = [
- '/bin/',
- '/boot',
- '/etc/',
- '/initrd',
- '/lib',
- '/sbin/',
- '/usr/',
- '/var']
- whitelist_match = False
- for i in pkg_whitelist:
- if file.startswith(i):
- whitelist_match = True
- break
- continue
-
- if whitelist_match and not file.startswith('/usr/local/'):
- pass
- return not file.startswith('/var/lib/schroot')
-
-
- def find_file_package(file):
- '''Return the package that ships the given file (or None if no package
- ships it).'''
- (dir, name) = os.path.split(file)
- resolved_dir = os.path.realpath(dir)
- if os.path.isdir(resolved_dir):
- file = os.path.join(resolved_dir, name)
-
- if not likely_packaged(file):
- return None
- return packaging.get_file_package(file)
-
-
- def seen_report(report):
- '''Check whether the given report file has already been processed
- earlier.'''
- st = os.stat(report)
- if not st.st_atime > st.st_mtime:
- pass
- return st.st_size == 0
-
-
- def mark_report_seen(report):
- '''Mark given report file as seen.'''
- st = os.stat(report)
-
- try:
- os.utime(report, (st.st_mtime, st.st_mtime - 1))
- except OSError:
- timeout = 12
- while timeout > 0:
- f = open(report)
- f.read(1)
- f.close()
-
- try:
- st = os.stat(report)
- except OSError:
- return None
-
- if st.st_atime > st.st_mtime:
- break
-
- time.sleep(0.1)
- timeout -= 1
- if timeout == 0:
- delete_report(report)
-
- except:
- timeout == 0
-
-
-
- def get_all_reports():
- '''Return a list with all report files which are accessible to the calling
- user.'''
- reports = []
- for r in glob.glob(os.path.join(report_dir, '*.crash')):
-
- try:
- if os.path.getsize(r) > 0 and os.access(r, os.R_OK):
- reports.append(r)
- continue
- except OSError:
- continue
-
-
-
- return reports
-
-
- def get_new_reports():
- '''Return a list with all report files which have not yet been processed
- and are accessible to the calling user.'''
- return _[1]
-
-
- def get_all_system_reports():
- '''Return a list with all report files which belong to a system user (i. e.
- uid < 500 according to LSB).'''
- reports = []
- for r in glob.glob(os.path.join(report_dir, '*.crash')):
-
- try:
- if os.path.getsize(r) > 0 and os.stat(r).st_uid < 500:
- reports.append(r)
- continue
- except OSError:
- continue
-
-
-
- return reports
-
-
- def get_new_system_reports():
- '''Return a list with all report files which have not yet been processed
- and belong to a system user (i. e. uid < 500 according to LSB).'''
- return _[1]
-
-
- def delete_report(report):
- '''Delete the given report file.
-
- If unlinking the file fails due to a permission error (if report_dir is not
- writable to normal users), the file will be truncated to 0 bytes instead.'''
-
- try:
- os.unlink(report)
- except OSError:
- open(report, 'w').truncate(0)
-
-
-
- def get_recent_crashes(report):
- '''Return the number of recent crashes for the given report file.
-
- Return the number of recent crashes (currently, crashes which happened more
- than 24 hours ago are discarded).'''
- pr = ProblemReport()
- pr.load(report, False)
-
- try:
- count = int(pr['CrashCounter'])
- report_time = time.mktime(time.strptime(pr['Date']))
- cur_time = time.mktime(time.localtime())
- if cur_time - report_time > 86400:
- return 0
- return count
- except (ValueError, KeyError):
- return 0
-
-
-
- def make_report_path(report, uid = None):
- '''Construct a canonical pathname for the given report.
-
- If uid is not given, it defaults to the uid of the current process.'''
- if report.has_key('ExecutablePath'):
- subject = report['ExecutablePath'].replace('/', '_')
- elif report.has_key('Package'):
- subject = report['Package'].split(None, 1)[0]
- else:
- raise ValueError, 'report has neither ExecutablePath nor Package attribute'
- if not report.has_key('ExecutablePath'):
- uid = os.getuid()
-
- return os.path.join(report_dir, '%s.%i.crash' % (subject, uid))
-
-
- def check_files_md5(sumfile):
- """Given a list of MD5 sums in md5sum(1) format (relative to /), check
- integrity of all files and return a list of files that don't match."""
- if not os.path.exists(sumfile):
- raise AssertionError
- m = subprocess.Popen([
- '/usr/bin/md5sum',
- '-c',
- sumfile], stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = True, cwd = '/', env = { })
- out = m.communicate()[0]
- if m.returncode == 0:
- return []
- mismatches = []
- for l in out.splitlines():
- if l.endswith('FAILED'):
- mismatches.append(l.rsplit(':', 1)[0])
- continue
- m.returncode == 0
-
- return mismatches
-
-
- def get_config(section, setting, default = None, bool = False):
- '''Return a setting from user configuration.
-
- This is read from ~/.config/apport/settings. If bool is True, the value is
- interpreted as a boolean.
- '''
- if not get_config.config:
- get_config.config = ConfigParser.ConfigParser()
- get_config.config.read(os.path.expanduser(_config_file))
-
-
- try:
- if bool:
- return get_config.config.getboolean(section, setting)
- return get_config.config.get(section, setting)
- except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
- return default
-
-
- get_config.config = None
- import unittest
- import tempfile
- import os
- import shutil
- import sys
- import time
- from cStringIO import StringIO
-
- class _ApportUtilsTest(unittest.TestCase):
-
- def setUp(self):
- global report_dir
- self.orig_report_dir = report_dir
- report_dir = tempfile.mkdtemp()
- self.orig_config_file = _config_file
-
-
- def tearDown(self):
- global report_dir, _config_file
- shutil.rmtree(report_dir)
- report_dir = self.orig_report_dir
- self.orig_report_dir = None
- _config_file = self.orig_config_file
-
-
- def _create_reports(self, create_inaccessible = False):
- '''Create some test reports.'''
- r1 = os.path.join(report_dir, 'rep1.crash')
- r2 = os.path.join(report_dir, 'rep2.crash')
- open(r1, 'w').write('report 1')
- open(r2, 'w').write('report 2')
- os.chmod(r1, 384)
- os.chmod(r2, 384)
- if create_inaccessible:
- ri = os.path.join(report_dir, 'inaccessible.crash')
- open(ri, 'w').write('inaccessible')
- os.chmod(ri, 0)
- return [
- r1,
- r2,
- ri]
- return [
- r1,
- r2]
-
-
- def test_find_package_desktopfile(self):
- '''find_package_desktopfile().'''
- nodesktop = 'bash'
- if not [](_[1]) == 0:
- raise AssertionError
- onedesktop = None
- multidesktop = None
- for d in os.listdir('/usr/share/applications/'):
- pkg = packaging.get_file_package(os.path.join('/usr/share/applications/', d))
- num = [](_[2])
- if not onedesktop and num == 1:
- onedesktop = pkg
- elif not multidesktop and num > 1:
- multidesktop = pkg
-
- if onedesktop and multidesktop:
- break
- continue
- len if not d.endswith('.desktop') else []
-
- if nodesktop:
- self.assertEqual(find_package_desktopfile(nodesktop), None, 'no-desktop package %s' % nodesktop)
-
- if multidesktop:
- self.assertEqual(find_package_desktopfile(multidesktop), None, 'multi-desktop package %s' % multidesktop)
-
- if onedesktop:
- d = find_package_desktopfile(onedesktop)
- self.assertNotEqual(d, None, 'one-desktop package %s' % onedesktop)
- self.assert_(os.path.exists(d))
- self.assert_(d.endswith('.desktop'))
-
-
-
- def test_likely_packaged(self):
- '''likely_packaged().'''
- self.assertEqual(likely_packaged('/bin/bash'), True)
- self.assertEqual(likely_packaged('/usr/bin/foo'), True)
- self.assertEqual(likely_packaged('/usr/local/bin/foo'), False)
- self.assertEqual(likely_packaged('/home/test/bin/foo'), False)
- self.assertEqual(likely_packaged('/tmp/foo'), False)
- self.assertEqual(likely_packaged('/var/lib/foo'), True)
- self.assertEqual(likely_packaged('/var/lib/schroot/bin/bash'), False)
-
-
- def test_find_file_package(self):
- '''find_file_package().'''
- self.assertEqual(find_file_package('/bin/bash'), 'bash')
- self.assertEqual(find_file_package('/bin/cat'), 'coreutils')
- self.assertEqual(find_file_package('/nonexisting'), None)
-
-
- def test_seen(self):
- '''get_new_reports() and seen_report().'''
- self.assertEqual(get_new_reports(), [])
- self.assertEqual(set(get_new_reports()), set(tr))
- nr = set(tr)
- for r in tr:
- self.assertEqual(seen_report(r), False)
- nr.remove(r)
- mark_report_seen(r)
- self.assertEqual(seen_report(r), True)
- self.assertEqual(set(get_new_reports()), nr)
-
-
-
- def test_get_all_reports(self):
- '''get_all_reports().'''
- self.assertEqual(get_all_reports(), [])
- self.assertEqual(set(get_all_reports()), set(tr))
- for r in tr:
- mark_report_seen(r)
-
- self.assertEqual(set(get_all_reports()), set(tr))
-
-
- def test_get_system_reports(self):
- '''get_all_system_reports() and get_new_system_reports().'''
- self.assertEqual(get_all_reports(), [])
- self.assertEqual(get_all_system_reports(), [])
-
-
- def test_delete_report(self):
- '''delete_report().'''
- tr = self._create_reports()
- while tr:
- self.assertEqual(set(get_all_reports()), set(tr))
- delete_report(tr.pop())
-
-
- def test_get_recent_crashes(self):
- '''get_recent_crashes().'''
- r = StringIO('ProblemType: Crash')
- self.assertEqual(get_recent_crashes(r), 0)
- r = StringIO('ProblemType: Crash\nDate: Wed Aug 01 00:00:01 1990')
- self.assertEqual(get_recent_crashes(r), 0)
- r = StringIO('ProblemType: Crash\nDate: Wed Aug 01 00:00:01 1990\nCrashCounter: 3')
- self.assertEqual(get_recent_crashes(r), 0)
- r = StringIO('ProblemType: Crash\nDate: %s\nCrashCounter: 3' % time.ctime(time.mktime(time.localtime()) - 90000))
- self.assertEqual(get_recent_crashes(r), 0)
- r = StringIO('ProblemType: Crash\nDate: %s\nCrashCounter: 3' % time.ctime(time.mktime(time.localtime()) - 3600))
- self.assertEqual(get_recent_crashes(r), 3)
-
-
- def test_make_report_path(self):
- '''make_report_path().'''
- pr = ProblemReport()
- self.assertRaises(ValueError, make_report_path, pr)
- pr['Package'] = 'bash 1'
- self.assert_(make_report_path(pr).startswith('%s/bash' % report_dir))
- pr['ExecutablePath'] = '/bin/bash'
- self.assert_(make_report_path(pr).startswith('%s/_bin_bash' % report_dir))
-
-
- def test_check_files_md5(self):
- '''check_files_md5().'''
- f1 = os.path.join(report_dir, 'test 1.txt')
- f2 = os.path.join(report_dir, 'test:2.txt')
- sumfile = os.path.join(report_dir, 'sums.txt')
- open(f1, 'w').write('Some stuff')
- open(f2, 'w').write('More stuff')
- open(sumfile, 'w').write('2e41290da2fa3f68bd3313174467e3b5 %s\nf6423dfbc4faf022e58b4d3f5ff71a70 %s\n' % (f1[1:], f2))
- self.assertEqual(check_files_md5(sumfile), [], 'correct md5sums')
- open(f1, 'w').write('Some stuff!')
- self.assertEqual(check_files_md5(sumfile), [
- f1[1:]], 'file 1 wrong')
- open(f2, 'w').write('More stuff!')
- self.assertEqual(check_files_md5(sumfile), [
- f1[1:],
- f2], 'files 1 and 2 wrong')
- open(f1, 'w').write('Some stuff')
- self.assertEqual(check_files_md5(sumfile), [
- f2], 'file 2 wrong')
-
-
- def test_get_config(self):
- '''get_config().'''
- global _config_file, _config_file
- _config_file = '/nonexisting'
- self.assertEqual(get_config('main', 'foo'), None)
- self.assertEqual(get_config('main', 'foo', 'moo'), 'moo')
- get_config.config = None
- f = tempfile.NamedTemporaryFile()
- _config_file = f.name
- self.assertEqual(get_config('main', 'foo'), None)
- self.assertEqual(get_config('main', 'foo', 'moo'), 'moo')
- get_config.config = None
- f.write('[main]\none=1\ntwo = TWO\nb1 = 1\nb2=False\n[spethial]\none= 99\n')
- f.flush()
- self.assertEqual(get_config('main', 'foo'), None)
- self.assertEqual(get_config('main', 'foo', 'moo'), 'moo')
- self.assertEqual(get_config('main', 'one'), '1')
- self.assertEqual(get_config('main', 'one', default = 'moo'), '1')
- self.assertEqual(get_config('main', 'two'), 'TWO')
- self.assertEqual(get_config('main', 'b1', bool = True), True)
- self.assertEqual(get_config('main', 'b2', bool = True), False)
- self.assertEqual(get_config('main', 'b3', bool = True), None)
- self.assertEqual(get_config('main', 'b3', default = False, bool = True), False)
- self.assertEqual(get_config('spethial', 'one'), '99')
- self.assertEqual(get_config('spethial', 'two'), None)
- self.assertEqual(get_config('spethial', 'one', 'moo'), '99')
- self.assertEqual(get_config('spethial', 'nope', 'moo'), 'moo')
- get_config.config = None
- f.close()
-
-
- if __name__ == '__main__':
- unittest.main()
-
-